Pipe 管道支持双向通信

Pipe 管道是通过 socket 实现的

在管道中数据是不安全的因为管道中没有锁的机制,多个进程修改同一块数据时,就会导致数据的混乱处理方式上锁

在管道中如果两个进程同时抢占一个资源的时候,在 Mac 和 Linux 就会报错,处理方式上锁

注意:没有被进程使用到的管道需要被关闭

1. Pipe管道的基本使用

from multiprocessing import Pipe


p1, p2 = Pipe()  # 返回值:两个对象,可以理解为两条管道


p1.send('hello')  # 管道p1发送了数据


print(p2.recv())  # hello 接收p1发送过来的数据,不需要传字节数


p2.send('world')  # 管道p2发送了数据


p2.close()  # 关闭 p2 管道


print(p1.recv())  # world 接收p2发送过来的数据,p2管道关闭了还可以接受到数据是因为数据是在关闭管道之前发送的,此时的数据已经存在管道里面了
print(p1.recv())  # EOFError 报错,当p2管道关闭了且里面没有任何数据那么就会报错,一般都会使用报错执行 try 来结束程序

2. 使用管道实现进程之间的通讯

from multiprocessing import Pipe
from multiprocessing import Process
from multiprocessing import Lock


def fun(son):
    while True:
        try:
            print(son.recv())  # 接收主进程发送过来的数据
        except EOFError:
            son.close()  # 关闭子进程的管道
            break


if __name__ == '__main__':
    foo, son = Pipe()
    p = Process(target=fun, args=(son,))  # 将其中一个管道传递给子进程进行通信
    p.start()
    son.close()  # 将主进程中没有用到的管道关闭掉

    foo.send('hello')
    foo.send('hello')
    foo.send('hello')
    foo.send('hello')
    foo.send('hello')

    foo.close()  # 数据发送完后关闭主进程的管道

3. 使用管道实现生产者消费者模型

# 生产者消费者模型1

import time
from multiprocessing import Pipe
from multiprocessing import Process
from multiprocessing import Lock


# 生产者
def func1(foo, l):
    for i in range(10):
        foo.send(i) # 发送数据
    foo.close()  # 关闭管道


# 消费者
def func2(son, l):
    while True:
        try:
            l.acquire()  # 加锁
            time.sleep(0.1)
            print(son.recv()) # 接收生产者发送过来的数据
            l.release()  # 解锁
        except EOFError:
            l.release()  # 要在这个地方进行解锁,因为上面没有解锁就报错了,如果不在这里解锁就会进行阻塞
            son.close()  # 关闭管道
            break


if __name__ == '__main__':
    foo, son = Pipe()
    l = Lock()

  # 生产者进程
    f1 = Process(target=func1, args=(foo, l))
    f1.start()
    f2 = Process(target=func1, args=(foo, l))
    f2.start()
    f3 = Process(target=func1, args=(foo, l))
    f3.start()

    # 消费者进程
    s1 = Process(target=func2, args=(son, l))
    s1.start()
    s2 = Process(target=func2, args=(son, l))
    s2.start()
    s3 = Process(target=func2, args=(son, l))
    s3.start()
    s4 = Process(target=func2, args=(son, l))
    s4.start()

    foo.close()  # 关闭主进程没有使用到的管道
    son.close()  # 关闭主进程没有使用到的管道

# 生产者消费者模型2

import time
from multiprocessing import Pipe
from multiprocessing import Process
from multiprocessing import Lock


# 生产者
def producer(produce, n):
    for i in range(n):
        produce.send(i)  # 发送数据
    produce.send(None)  # 发送数据
    produce.send(None)  # 发送数据
    produce.close()  # 关闭管道


# 消费者
def consumer(consume, name, lock):
    while True:
        lock.acquire()  # 加锁
        baozi = consume.recv()  # 接收生产者发送过来的数据 
        lock.release()  # 解锁
        if baozi:
            time.sleep(0.1)
            print('%s 吃了包子:%s' % (name, baozi))
        else:
            consume.close()  # 关闭管道
            break


if __name__ == '__main__':
    produce, consume = Pipe()
    lock = Lock()
# 生产者进程
    p1 = Process(target=producer, args=(produce, 10))
    p1.start()

 # 消费者进程
    c1 = Process(target=consumer, args=(consume, 'Kevin', lock))
    c2 = Process(target=consumer, args=(consume, 'Yeung', lock))
    c1.start()
    c2.start()

    produce.close()  # 关闭主进程没有使用到的管道
    consume.close()  # 关闭主进程没有使用到的管道

    p1.join()
    c1.join()
    c2.join()
    print('主进程')